package com.mimo.logic.room.service.impl;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.data.redis.core.query.SortQuery;
import org.springframework.data.redis.core.query.SortQueryBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import com.mimo.common.configuration.mrlock.annotation.DistributedRedisLock;
import com.mimo.common.dto.Entry;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.room.dto.RoomMemberQueryCriteria;
import com.mimo.common.result.CollectionResult;
import com.mimo.common.utils.DateUtil;
import com.mimo.common.utils.JsonUtils;
import com.mimo.logic.blocking.constants.DenyOperation;
import com.mimo.logic.blocking.service.IAccessiableService;
import com.mimo.logic.room.constants.RoomKeys;
import com.mimo.logic.room.model.RoomDefinitionInfo;
import com.mimo.logic.room.model.RoomStatisticsDTO;
import com.mimo.logic.room.service.IRoomCallback;
import com.mimo.logic.room.service.IRoomService;

/**
 * 对房间及其下成员维护时，必须对roomId进行统一加锁处理
 * 
 * @author Hongyu
 */
@Service
public class RoomServiceImpl implements IRoomService {

  private static final Duration Room_Expired = Duration.ofDays(1);

  /**
   * 表示是否支持房间的永久性，默认为FALSE
   */
  @Value("${logic.room.forever:false}")
  private Boolean supportRoomForever;

  @Autowired
  private RedisTemplate<String, String> redisTemplate;

  @Autowired
  private IRoomCallback callback;

  @Autowired
  private IAccessiableService userAccessiableService;

  @Override
  public boolean isMemberOf(String roomId, String userId) {
    return Objects
        .nonNull(redisTemplate.opsForZSet().score(MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId), userId));
  }

  @Override
  @DistributedRedisLock(key = RoomKeys.LOGIC_ROOM_LOCKER_PREFIX + "#{#definition.roomId}", expired = 60, waited = 60)
  public boolean create(RoomDefinitionInfo definition) {
    Date now = new Date();
    long timestamp = now.getTime();

    if (!this.exists(definition.getRoomId())) {// room去重检查
      redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        byte[] roomId = definition.getRoomId().getBytes();
        connection.hSet(RoomKeys.LOGIC_ROOM_DEFINITION.getBytes(), roomId, JsonUtils.toJsonBytes(definition));
        connection.zAdd(RoomKeys.LOGIC_ROOM_BUCKET.getBytes(), timestamp, roomId);
        connection.zAdd(RoomKeys.LOGIC_ROOM_SIZE.getBytes(), 0, roomId);
        if (definition.getTtl() == RoomDefinitionInfo.DEFAULT) {
          connection.zAdd(RoomKeys.LOGIC_ROOM_TTL.getBytes(), RoomDefinitionInfo.DEFAULT, roomId);
        } else {
          Date expiredDate = DateUtil.addField(now, Calendar.DATE, (int) definition.getTtl());
          connection.zAdd(RoomKeys.LOGIC_ROOM_TTL.getBytes(), expiredDate.getTime(), roomId);
        }
        return null;
      });

      // 回调通知
      callback.onCreated(definition.getRoomId());
      return true;
    }
    return false;
  }

  /**
   * 对于删除房间动作，属于强制清理。
   * <p>
   * 这意味着每一次房间内发言时，都需要先判断用户属于房间成员
   * <p>
   * 由于房间和成员关系，需要维护 ，所以，房间被删除时，需要对用户与房间关联 ，做一波清理
   */
  @Override
  @DistributedRedisLock(key = RoomKeys.LOGIC_ROOM_LOCKER_PREFIX + "#{#roomId}", expired = 60, waited = 60)
  public void delete(String roomId) {
    byte[] rid = roomId.getBytes();

    List<Object> rets = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {

      byte[] roomMembersKey = MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId).getBytes();

      // 获取房间里的所有用户
      connection.zRange(roomMembersKey, 0, -1);

      // 清空整个房间的member key
      connection.del(roomMembersKey);

      connection.hDel(RoomKeys.LOGIC_ROOM_DEFINITION.getBytes(), rid);
      connection.zRem(RoomKeys.LOGIC_ROOM_BUCKET.getBytes(), rid);
      connection.zRem(RoomKeys.LOGIC_ROOM_SIZE.getBytes(), rid);
      connection.zRem(RoomKeys.LOGIC_ROOM_TTL.getBytes(), rid);
      return null;
    });

    // 房间被销毁时，需要主动清除用户与房间的关系
    @SuppressWarnings("unchecked")
    Set<String> members = (Set<String>) rets.get(0);
    if (!CollectionUtils.isEmpty(members)) {
      redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (String m : members) {
          connection.zRem(MessageFormat.format(RoomKeys.LOGIC_ROOM_BY_USER, m).getBytes(), rid);
        }
        return null;
      });
    }

    // 回调通知
    callback.onDestroy(roomId);
  }

  /**
   * 查看房间还能有多少个剩余席位
   * 
   * @param roomId
   * @return
   */
  private int getRoomPaddingCap(String roomId) {
    int diff = 0;
    String def = redisTemplate.<String, String> opsForHash().get(RoomKeys.LOGIC_ROOM_DEFINITION, roomId);
    if (StringUtils.hasText(def)) {
      RoomDefinitionInfo roomDef = JsonUtils.parseJson(def, RoomDefinitionInfo.class);
      diff = roomDef.getCapacity() - this.getSizeByRoom(roomId);
    }
    return diff;
  }

  @Override
  @DistributedRedisLock(key = RoomKeys.LOGIC_ROOM_LOCKER_PREFIX + "#{#roomId}", expired = 60, waited = 60)
  public StatusCode add(String roomId, String uid) {

    if (!userAccessiableService.checkAccessiable(uid, null, DenyOperation.GlobalBlock).isSuccess()) {// 是否全局封禁
      return DenyOperation.GlobalBlock.getStatusCode();
    }

    if (!userAccessiableService.checkAccessiable(uid, roomId, DenyOperation.JoinRoom).isSuccess()) {// 是否允许加入房间
      return DenyOperation.JoinRoom.getStatusCode();
    }

    if (!this.exists(roomId)) { // 房间已经不存在了
      return StatusCode.RoomNotExist;
    }

    if (getRoomPaddingCap(roomId) < 1) {// 房间剩余席位不足
      return StatusCode.RoomMemberCap;
    }

    if (!isMemberOf(roomId, uid)) { // 如果用户也未曾加入房间
      redisTemplate.executePipelined((RedisCallback<Object>) connection -> {

        long jointTime = System.currentTimeMillis();

        // 添加成员进房间
        connection.zAdd(MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId).getBytes(), jointTime,
            uid.getBytes());

        // 房间在线人数+1
        connection.zIncrBy(RoomKeys.LOGIC_ROOM_SIZE.getBytes(), 1, roomId.getBytes());

        // 维护成员名下的房间信息
        connection.zAdd(MessageFormat.format(RoomKeys.LOGIC_ROOM_BY_USER, uid).getBytes(), jointTime,
            roomId.getBytes());

        return null;
      });

      // 回调通知
      callback.onJoint(roomId, uid);
    }

    return StatusCode.Success;
  }

  @Override
  @DistributedRedisLock(key = RoomKeys.LOGIC_ROOM_LOCKER_PREFIX + "#{#roomId}", expired = 60, waited = 60)
  public boolean leave(String roomId, String uid) {

    boolean done = false;
    if (this.exists(roomId) && isMemberOf(roomId, uid)) {
      redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        // 成员离开房间
        connection.zRem(MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId).getBytes(), uid.getBytes());

        // 房间在线人数-1
        connection.zIncrBy(RoomKeys.LOGIC_ROOM_SIZE.getBytes(), -1, roomId.getBytes());

        // 维护成员名下的房间信息
        connection.zRem(MessageFormat.format(RoomKeys.LOGIC_ROOM_BY_USER, uid).getBytes(), roomId.getBytes());

        return null;
      });
      done = true;
    }

    // 考虑到有可能离开消息无法保证投递和处理，故必须允许客户端多次调用以保证本地缓存的校正
    callback.onLeft(roomId, uid);

    return done;
  }

  @Override
  public int getSizeByRoom(String roomId) {
    return Optional.ofNullable(redisTemplate.opsForZSet().score(RoomKeys.LOGIC_ROOM_SIZE, roomId)).orElse(0d)
        .intValue();
  }

  @Override
  public Set<String> getUsersByRoom(String roomId) {
    return redisTemplate.opsForZSet().range(MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId), 0, -1);
  }

  @SuppressWarnings("unchecked")
  @Override
  public Optional<RoomStatisticsDTO> load(String roomId) {

    return Optional.ofNullable(redisTemplate.<String, String> opsForHash().get(RoomKeys.LOGIC_ROOM_DEFINITION, roomId))
        .map(df -> {
          RoomStatisticsDTO room = new RoomStatisticsDTO();
          room.setId(roomId);

          // 一次拿回所有数据，不一定能拿齐
          List<Object> data = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            byte[] rid = roomId.getBytes();
            connection.zScore(RoomKeys.LOGIC_ROOM_SIZE.getBytes(), rid);// 房间当前人数
            connection.zRange(MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, roomId).getBytes(), 0, -1);// 所有成员
            connection.hGet(RoomKeys.LOGIC_ROOM_DEFINITION.getBytes(), roomId.getBytes()); // 房间配置
            return null;
          });

          room.setSize(Optional.ofNullable((Double) data.get(0)).orElse(0d).intValue());
          room.setUsers(Optional.ofNullable((Set<String>) data.get(1)).orElseGet(Collections::emptySet));
          room.setDefinition(Optional.ofNullable((String) data.get(2))
              .map(json -> JsonUtils.parseJson(json, RoomDefinitionInfo.class)).orElse(null));
          return room;
        });
  }

  @Override
  public boolean exists(String roomId) {
    return redisTemplate.<String, String> opsForHash().hasKey(RoomKeys.LOGIC_ROOM_DEFINITION, roomId);
  }

  @Override
  public Collection<String> listJointRooms(String userId) {
    return redisTemplate.opsForZSet().range(MessageFormat.format(RoomKeys.LOGIC_ROOM_BY_USER, userId), 0, -1);
  }

  @Override
  public Collection<RoomStatisticsDTO> listJointRoomsByUserId(String userId) {
    Collection<String> rs = Optional
        .ofNullable(redisTemplate.opsForZSet().range(MessageFormat.format(RoomKeys.LOGIC_ROOM_BY_USER, userId), 0, -1))
        .orElseGet(() -> Collections.emptySet());

    return rs.stream()
        .map(roomId -> redisTemplate.<String, String> opsForHash().get(RoomKeys.LOGIC_ROOM_DEFINITION, roomId))
        .filter(Objects::nonNull).map(json -> {
          RoomStatisticsDTO dto = new RoomStatisticsDTO();

          RoomDefinitionInfo definition = JsonUtils.parseJson(json, RoomDefinitionInfo.class);

          dto.setId(definition.getRoomId());
          dto.setDefinition(definition);

          dto.setSize(
              Optional.ofNullable(redisTemplate.opsForZSet().score(RoomKeys.LOGIC_ROOM_SIZE, definition.getRoomId()))
                  .orElse(0d).intValue());
          return dto;
        }).collect(Collectors.toList());
  }

  @Override
  public Collection<String> getExpireRooms(long count) {
    Set<String> expiredTargets = Collections.emptySet();

    long now = System.currentTimeMillis();

    if (this.supportRoomForever) {// 如果是支持永久房间的，则直接把过期的拉出来返回就可以了
      expiredTargets = redisTemplate.opsForZSet().rangeByScoreWithScores(RoomKeys.LOGIC_ROOM_TTL, 0, now, 0, count)
          .stream().map(TypedTuple::getValue).collect(Collectors.toSet());
    } else {
      // 如果是不支持永久房间的，则意味着，对于过期的直接找出来，同时，对于ttl为-1的房间，顺便检查一下房间的人数，如果为空，同时加入到淘汰列队中
      Set<TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores(RoomKeys.LOGIC_ROOM_TTL, -1,
          now, 0, count);// 此处包括了-1值的房间

      if (!CollectionUtils.isEmpty(tuples)) {
        Set<String> expiredRooms = tuples.stream().filter(tuple -> tuple.getScore() > 0 && tuple.getScore() < now)
            .map(TypedTuple::getValue).collect(Collectors.toSet());

        if (tuples.size() != expiredRooms.size()) {
          tuples.removeIf(t -> expiredRooms.contains(t.getValue()));// 减少计算体积
          if (!CollectionUtils.isEmpty(tuples)) {
            List<String> infinites = redisTemplate.<String, String> opsForHash().multiGet(
                RoomKeys.LOGIC_ROOM_DEFINITION, tuples.stream().map(TypedTuple::getValue).collect(Collectors.toList()));
            expiredRooms.addAll(infinites.stream().filter(Objects::nonNull)
                .map(rid -> JsonUtils.parseJson(rid, RoomDefinitionInfo.class))
                .filter(def -> def.getCreatedDate().getTime() + Room_Expired.getSeconds() < now)
                .filter(def -> this.getSizeByRoom(def.getRoomId()) < 1).map(RoomDefinitionInfo::getRoomId)
                .collect(Collectors.toList()));// 如果发生ttl=-1的房间，当前人数是空的，同步添加到淘汰队列中
          }
        }

        expiredTargets = expiredRooms;
      }

    }

    return expiredTargets;
  }

  @Override
  public List<RoomStatisticsDTO> queryAllRooms() {
    return redisTemplate.<String, String> opsForHash().entries(RoomKeys.LOGIC_ROOM_DEFINITION).entrySet().stream()
        .map(e -> {
          RoomStatisticsDTO dto = new RoomStatisticsDTO();

          String roomId = e.getKey();
          RoomDefinitionInfo roomDefinitionInfo = JsonUtils.parseJson(e.getValue(), RoomDefinitionInfo.class);

          dto.setId(roomId);
          // 定义填充
          dto.setDefinition(roomDefinitionInfo);

          dto.setSize(Optional.ofNullable(redisTemplate.opsForZSet().score(RoomKeys.LOGIC_ROOM_SIZE, roomId)).orElse(0d)
              .intValue());
          return dto;
        }).collect(Collectors.toList());
  }

  @Override
  public CollectionResult<Entry<String, Double>> queryRoomMembers(RoomMemberQueryCriteria query) {
    Collection<Entry<String, Double>> content = new LinkedList<>();

    String key = MessageFormat.format(RoomKeys.LOGIC_ROOM_MEMBERS, query.getRoomId());

    if (StringUtils.hasText(query.getUserId())) {
      Double joinTime = redisTemplate.opsForZSet().score(key, query.getUserId());
      if (Objects.nonNull(joinTime)) {
        content = Arrays.asList(new Entry<>(query.getUserId(), joinTime));
      }
    } else {
      PageRequest pr = PageRequest.of(query.getPage(), query.getSize());
      SortQuery<String> q = SortQueryBuilder.sort(key).noSort().limit(pr.getOffset(), pr.getPageSize()).build();
      List<String> members = redisTemplate.sort(q);

      if (!CollectionUtils.isEmpty(members)) {
        List<Object> data = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
          for (String m : members) {
            connection.zScore(key.getBytes(), m.getBytes());
          }
          return null;
        });

        for (int i = 0; i < members.size(); i++) {
          content.add(new Entry<>(members.get(i), (Double) data.get(i)));
        }
        content.removeIf(e -> Objects.isNull(e.getValue()));
      }
    }

    return new CollectionResult<>(content, redisTemplate.opsForZSet().size(key));
  }

}
